package com.it.cloud.stream;

import java.util.Date;
import java.util.Properties;
import org.apache.commons.lang.time.DateFormatUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;

/**
 *stream事例：IP访问过滤
 * 过滤掉访问量大于等于2的IP
 */
public class BlackListChecker {
	
	/**
	 * java8实时计算黑名单
	 */
	public static void checkBlackListOfJava8(){

		Properties props = new Properties();
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ip-blacklist-checker");
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		// Key序列化与反序化类
		props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		// Value序列化与反序化类
		props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		// 从最新消息开始消费
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		// 指定保存当前位置的时间间隔，默认30s
		props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000");
		// 设置轮询kafka主题获取数据源的时间间隔，默认100
		props.put(StreamsConfig.POLL_MS_CONFIG, "10");

		KStreamBuilder builder = new KStreamBuilder();
		// 读取access-log主题
		KStream<String, String> accessLog = builder.stream("access-log");
		// 将每个消息构建成KeyValue，为了根据key分组
		accessLog.map((key,value) -> new KeyValue<>(value, value))
		// 根据key分组
		.groupByKey()
		// 指定时间窗口为1分钟, 即每次统计用户在1分钟内的请求
		// 将KGroupStream转换为KTable
		.count(TimeWindows.of(60 * 1000L).advanceBy(60*1000), "access-count")
		//转为KStream
		.toStream()
		.filter((Windowed<String> key, Long value) -> null!=value && value >=2)
		// 处理命中的记录
		.process(()-> new IpBlackListProcessor());

		// 启动
		KafkaStreams streams = new KafkaStreams(builder, props);
		streams.start();

	
	}

	/**
	 * java7实时计算黑名单
	 */
	public static void checkBlackListOfJava7() {
		Properties props = new Properties();
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ip-blacklist-checker");
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		// Key序列化与反序化类
		props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		// Value序列化与反序化类
		props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		// 从最新消息开始消费
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		// 指定保存当前位置的时间间隔，默认30s
		props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000");
		// 设置轮询kafka主题获取数据源的时间间隔，默认100
		props.put(StreamsConfig.POLL_MS_CONFIG, "10");

		KStreamBuilder builder = new KStreamBuilder();
		KStream<String, String> accessLog = builder.stream("access-log");
		// 将每个消息构建成KeyValue，为了根据key分组
		// 我们没有设置Key，设置key与Value相同
		accessLog.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
					@Override
					public KeyValue<String, String> apply(String key, String value) {
						return new KeyValue<String, String>(value, value);
					}
				})
				.groupByKey()
				// 指定时间窗口为1分钟, 即每次统计用户在1分钟内的请求
				// access-count为状态数据key
				.count(TimeWindows.of(60 * 1000L).advanceBy(60*1000), "access-count")
				// 转为KStream
				.toStream()
				// 过滤数据
				.filter(new Predicate<Windowed<String>, Long>() {

					@Override
					public boolean test(Windowed<String> key, Long value) {//指定规则
						System.out.println("请求时间："+DateFormatUtils.format(new Date(System.currentTimeMillis()), "HH:mm:ss")+",IP:"+key.key()+",请求次数:"+value);
						if(null!=value&&value.longValue()>=2){
							return true;
						}
						// 返回false的数据将被过滤掉
						return false;
					}
				})
				// 处理命中的记录
				.process(new ProcessorSupplier<Windowed<String>, Long>() {
					@Override
					public Processor<Windowed<String>, Long> get() {
						// 使用自定义的Process对命中的记录进行处理
						return new IpBlackListProcessor();
					}
				}, "access-count");

		// 启动
		KafkaStreams streams = new KafkaStreams(builder, props);
	    streams.start();

	}
	
	public static void main(String[] args) {
		checkBlackListOfJava7();
	}
}
